// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package cn.org.dbtools.star.stack.connector;

import cn.org.dbtools.star.stack.entity.ClusterInfoEntity;
import cn.org.dbtools.star.stack.exception.StarRequestException;
import cn.org.dbtools.star.stack.constant.ConstantDef;
import cn.org.dbtools.star.stack.model.palo.TableSchemaInfo;
import lombok.extern.slf4j.Slf4j;
import cn.org.dbtools.star.stack.model.response.construct.NativeQueryResp;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@Component
@Slf4j
public class StarMetaInfoClient extends StarClient {
    protected HttpClientPoolManager poolManager;

    @Autowired
    public StarMetaInfoClient(HttpClientPoolManager poolManager) {
        this.poolManager = poolManager;
    }

    @Autowired
    private StarQueryClient queryClient;

    public List<String> getFeList(ClusterInfoEntity entity) throws Exception {
        String feQuery = "SHOW FRONTENDS";
        return getMetaQueryResult(feQuery, ConstantDef.STAR_DEFAULT_NS, ConstantDef.STAR_DEFAULT_SCHEMA, entity);
    }

    public List<String> getBeList(ClusterInfoEntity entity) throws Exception {
        String beQuery = "SHOW BACKENDS";
        return getMetaQueryResult(beQuery, ConstantDef.STAR_DEFAULT_NS, ConstantDef.STAR_DEFAULT_SCHEMA, entity);
    }

    public List<String> getActiveFeList(ClusterInfoEntity entity) throws Exception {
        List<List<String>> feList = getFeInfoList(entity);
        List<String> activeFeList = new ArrayList<>();
        for (List<String> fe : feList) {
            if (fe.get(9).equalsIgnoreCase("true")) {
                activeFeList.add(fe.get(1));
            }
        }
        return activeFeList;
    }

    public List<List<String>> getFeInfoList(ClusterInfoEntity entity) throws Exception {
        String querySql = "SHOW PROC '/frontends';";
        log.debug("Send get backends list for query, sql is {}.", querySql);
        NativeQueryResp result = queryClient.executeSQL(querySql, ConstantDef.STAR_DEFAULT_NS, ConstantDef.STAR_DEFAULT_SCHEMA, entity);

        if (result.getData().isEmpty()) {
            throw new StarRequestException("Get be info error.");
        }
        return result.getData();
    }

    public List<String> getActiveBeList(ClusterInfoEntity entity) throws Exception {
        List<List<String>> beList = getBeInfoList(entity);
        List<String> activeBeList = new ArrayList<>();
        for (List<String> fe : beList) {
            if (fe.get(8).equalsIgnoreCase("true")) {
                activeBeList.add(fe.get(1));
            }
        }
        return activeBeList;
    }

    public List<List<String>> getBeInfoList(ClusterInfoEntity entity) throws Exception {
        String querySql = "SHOW PROC '/backends';";
        log.debug("Send get backends list for query, sql is {}.", querySql);
        NativeQueryResp result = queryClient.executeSQL(querySql, ConstantDef.STAR_DEFAULT_NS, ConstantDef.STAR_DEFAULT_SCHEMA, entity);

        if (result.getData().isEmpty()) {
            throw new StarRequestException("Get be info error.");
        }
        return result.getData();
    }

    public List<String> getDatabaseList(String nsName, ClusterInfoEntity entity) throws Exception {
        String dbQuery = String.format("SHOW DATABASES FROM %s ;", nsName);
        return getMetaQueryResult(dbQuery, ConstantDef.STAR_DEFAULT_NS, ConstantDef.STAR_DEFAULT_SCHEMA, entity);
    }

    public List<String> getTableList(String nsName, String dbName, ClusterInfoEntity entity) throws Exception {
        String tableQuery = String.format("SHOW TABLES FROM %s.%s", nsName, dbName);
        return getMetaQueryResult(tableQuery, nsName, dbName, entity);
    }

    public List<String> getAllTableList(ClusterInfoEntity entity) throws Exception {
        String tableQuery = String.format("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                + "GROUP BY TABLE_SCHEMA,TABLE_NAME;");
        return getMetaQueryResult(tableQuery, ConstantDef.STAR_DEFAULT_NS, ConstantDef.STAR_DEFAULT_SCHEMA, entity);
    }

    public TableSchemaInfo.TableSchema getTableBaseSchema(String ns, String db, String table,
                                                          ClusterInfoEntity entity) throws Exception {
        String schemaQuery = String.format("DESC %s.%s.%s ALL;", ns, db, table);
        List<List<String>> result = queryClient.executeSQL(schemaQuery, ns, db, entity).getData();

        TableSchemaInfo.TableSchema tableSchema = new TableSchemaInfo.TableSchema();
        tableSchema.setBaseIndex(true);
        tableSchema.setKeyType(result.get(0).get(1));

        List<TableSchemaInfo.Schema> schemaList = new ArrayList<>();
        for (List<String> fieldEntity : result) {
            TableSchemaInfo.Schema schema = new TableSchemaInfo.Schema();
            schemaStructMap(fieldEntity, schema);
            schemaList.add(schema);
        }
        tableSchema.setSchema(schemaList);

        return tableSchema;
    }

    private void schemaStructMap(List<String> fieldEntity, TableSchemaInfo.Schema schema) {
        schema.setField(fieldEntity.get(2));
        schema.setType(fieldEntity.get(3));
        schema.setIsNull(fieldEntity.get(4));
        schema.setKey(fieldEntity.get(5));
        schema.setDefaultVal(fieldEntity.get(5));
        schema.setComment("");
        schema.setAggrType("");
    }

    private List<String> getMetaQueryResult(String querySql, String nsName, String db, ClusterInfoEntity entity) throws Exception {
        log.debug("Send get meta info query, sql is {}.", querySql);
        NativeQueryResp result = queryClient.executeSQL(querySql, nsName, db, entity);

        if (result.getData().isEmpty()) {
            log.info("Get meta info result is empty, sql is {}", querySql);
        }
        return result.getData().stream().map(row -> row.get(0)).collect(Collectors.toList());
    }
}
